Tutorial - Generate a DomoStats dataset using DomoLibrary

DomoStats
MagicETL
Author

Jae Wilson

Published

November 28, 2023

# pip install --upgrade domolibrary
import domolibrary

domolibrary.__version__
'4.2.105'

retrieve Authentication credentials

Don’t store your password on the internet!!!

import os

domo_instance = os.environ['DOMO_INSTANCE']
domo_access_token = os.environ['DOMO_ACCESS_TOKEN']

TEST_DATAFLOW_ID = 108
import domolibrary.client.DomoAuth as dmda

auth = dmda.DomoTokenAuth(
    domo_instance=domo_instance,
    domo_access_token=domo_access_token
)

await auth.print_is_token()

assert auth.token
🎉 token_auth token retrieved from domo-community ⚙️

Retrieve Data

  • How do you handle looping??

  • How do you handle ‘fast’ code execution? (asynchronous code execution)

  • Will one request be enough?

    • get all dataflow
    • for each dataflow retrieve the correct information”
import domolibrary.classes.DomoDatacenter as dmdc
from typing import List


async def get_dataflow_ids(auth: dmda.DomoAuth) -> List[int]:
    """searches domo datacenter and returns a list of dataflow_ids"""

    domo_datacenter = dmdc.DomoDatacenter(auth=auth)

    dataflows_ls = await domo_datacenter.search_datacenter(
        auth=auth,
        entity_type="DATAFLOW",
        additional_filters_ls=[
            {
                "filterType": "term",
                "field": "data_flow_type",
                "value": "MAGIC",
                "name": "Magic ETL v2",
                "not": False,
            }
        ],
    )

    return [dataflow_obj["databaseId"] for dataflow_obj in dataflows_ls]


dataflow_ids = await get_dataflow_ids(auth)
dataflow_ids[0:5]
['131', '227', '37', '185', '38']
import domolibrary.classes.DomoDataflow as dmdf
import domolibrary.utils.chunk_execution as ce
from typing import List


async def get_dataflows(dataflow_ids_ls: List[int]) -> List[dmdf.DomoDataflow]:
    """retrieves dataflow metadata from a list of dataflows"""

    return await ce.gather_with_concurrency(
        *[
            dmdf.DomoDataflow.get_by_id(dataflow_id=dataflow_id, auth=auth)
            for dataflow_id in dataflow_ids_ls
        ],
        n=20
    )


domo_dataflow_ls = await get_dataflows([dataflow_ids[0]])
domo_dataflow = domo_dataflow_ls[0]
domo_dataflow
DomoDataflow(id=131, name='BeastModesGovernance_01_AccumulateHistory', owner=None, description=None, tags=None, actions=[DomoDataflow_Action(id='40554fc4-428a-4117-8820-b3f4f4927021', type='LoadFromVault', name='Governance_Beast Modes', datasource_id='b7998b45-ccb5-4183-830e-8c2bdd489fd0', sql=None, depends_on=None, parent_actions=None), DomoDataflow_Action(id='05082103-d42e-49ab-a66e-bd31b4c0e3df', type='ExpressionEvaluator', name='Add Formula', datasource_id=None, sql=None, depends_on=['40554fc4-428a-4117-8820-b3f4f4927021'], parent_actions=[DomoDataflow_Action(id='40554fc4-428a-4117-8820-b3f4f4927021', type='LoadFromVault', name='Governance_Beast Modes', datasource_id='b7998b45-ccb5-4183-830e-8c2bdd489fd0', sql=None, depends_on=None, parent_actions=None)]), DomoDataflow_Action(id='fe77761b-eb11-4491-b19c-ebb56d009b76', type='PublishToVault', name='Governance_BeastMode_historical', datasource_id='e6559952-30ce-41a1-b831-f29ebe95d6e5', sql=None, depends_on=['05082103-d42e-49ab-a66e-bd31b4c0e3df'], parent_actions=[DomoDataflow_Action(id='05082103-d42e-49ab-a66e-bd31b4c0e3df', type='ExpressionEvaluator', name='Add Formula', datasource_id=None, sql=None, depends_on=['40554fc4-428a-4117-8820-b3f4f4927021'], parent_actions=[DomoDataflow_Action(id='40554fc4-428a-4117-8820-b3f4f4927021', type='LoadFromVault', name='Governance_Beast Modes', datasource_id='b7998b45-ccb5-4183-830e-8c2bdd489fd0', sql=None, depends_on=None, parent_actions=None)])])], version_id=None, version_number=None, versions=None, History=DomoDataflow_History(execution_history=None), Lineage=DomoLineage(parent_type='DATAFLOW', parent_id=131))
import pandas as pd


async def generate_version_action_pdf(domo_dataflow: dmdf.DomoDataflow) -> pd.DataFrame:
    """retrieves dataflow definition version history of a dataflow"""

    dataflow_versions = await domo_dataflow.get_versions()

    if not domo_dataflow.versions:
        return pd.DataFrame()

    df = pd.DataFrame(
        [
            {
                "dataflow_id": dataflow_version.id,
                "dataflow_version": dataflow_version.version_id,
                "dataflow_name": dataflow_version.name,
                **domo_action.__dict__,
            }
            for dataflow_version in dataflow_versions
            for domo_action in dataflow_version.actions
        ]
    )

    df.drop(columns=["parent_actions", "datasource_id", "sql"], inplace=True)
    df.rename(columns={"id": "tile_id", "type": "tile_type"}, inplace=True)

    return df


await generate_version_action_pdf(domo_dataflow)
dataflow_id dataflow_version dataflow_name tile_id tile_type name depends_on
0 131 520 BeastModesGovernance_01_AccumulateHistory 40554fc4-428a-4117-8820-b3f4f4927021 LoadFromVault Governance_Beast Modes None
1 131 520 BeastModesGovernance_01_AccumulateHistory 05082103-d42e-49ab-a66e-bd31b4c0e3df ExpressionEvaluator Add Formula [40554fc4-428a-4117-8820-b3f4f4927021]
2 131 520 BeastModesGovernance_01_AccumulateHistory fe77761b-eb11-4491-b19c-ebb56d009b76 PublishToVault Governance_BeastMode_historical [05082103-d42e-49ab-a66e-bd31b4c0e3df]
3 131 519 New ETL Transform 40554fc4-428a-4117-8820-b3f4f4927021 LoadFromVault Governance_Beast Modes None
4 131 519 New ETL Transform 05082103-d42e-49ab-a66e-bd31b4c0e3df ExpressionEvaluator Add Formula [40554fc4-428a-4117-8820-b3f4f4927021]
5 131 519 New ETL Transform fe77761b-eb11-4491-b19c-ebb56d009b76 PublishToVault Governance_BeastMode_historical [05082103-d42e-49ab-a66e-bd31b4c0e3df]
6 131 518 New ETL Transform 40554fc4-428a-4117-8820-b3f4f4927021 LoadFromVault Governance_Beast Modes None
7 131 518 New ETL Transform fe77761b-eb11-4491-b19c-ebb56d009b76 PublishToVault Governance_BeastMode_historical [05082103-d42e-49ab-a66e-bd31b4c0e3df]
8 131 518 New ETL Transform 05082103-d42e-49ab-a66e-bd31b4c0e3df ExpressionEvaluator Add Formula [40554fc4-428a-4117-8820-b3f4f4927021]
async def generate_action_stats_df(
    domo_dataflow: dmdf.DomoDataflow,
    maximum_history: int = 10,  # number of history to look back
) -> pd.DataFrame:
    """returns execution history by tile"""

    await domo_dataflow.History.get_execution_history(maximum=maximum_history)

    if (
        not domo_dataflow.History
        or not domo_dataflow.History.execution_history
        or len(domo_dataflow.History.execution_history) == 0
    ):
        print(
            f"⚠️ dataflow {domo_dataflow.id} - {domo_dataflow.name} has never been executed"
        )
        return pd.DataFrame()

    history_ls = [
        domo_history
        for domo_history in domo_dataflow.History.execution_history
        if domo_history.action_results and len(domo_history.action_results) > 0
    ]

    df = pd.DataFrame(
        [
            {
                "dataflow_execution_id": domo_history.dataflow_execution_id,
                "dataflow_version": domo_history.dataflow_version,
                "dataflow_id": domo_history.dataflow_id,
                "dataflow_begin_time": domo_history.begin_time,
                **domo_action.__dict__,
            }
            for domo_history in history_ls
            for domo_action in domo_history.action_results
        ]
    )

    df.drop(columns=["name"], inplace=True)

    df.rename(columns={"id": "tile_id", "type": "tile_type"}, inplace=True)

    return df

stats_df = None
try:
    stats_df = await generate_action_stats_df(domo_dataflow)
except Exception as e:
    print(e)

stats_df
dataflow_execution_id dataflow_version dataflow_id dataflow_begin_time tile_id tile_type is_success rows_processed begin_time end_time duration_in_sec
0 4759ab00-3f9d-4d55-9087-32aa38655a51 519 131 2023-03-07 00:07:39 40554fc4-428a-4117-8820-b3f4f4927021 DataHubManifestLoaderAction True 1239 2023-03-07 00:07:46.011 2023-03-07 00:07:46.919 0.908
1 4759ab00-3f9d-4d55-9087-32aa38655a51 519 131 2023-03-07 00:07:39 05082103-d42e-49ab-a66e-bd31b4c0e3df ExpressionEvaluator True 1239 2023-03-07 00:07:46.630 2023-03-07 00:07:46.919 0.289
2 4759ab00-3f9d-4d55-9087-32aa38655a51 519 131 2023-03-07 00:07:39 fe77761b-eb11-4491-b19c-ebb56d009b76 PublishToVault True 1239 2023-03-07 00:07:46.631 2023-03-07 00:07:47.434 0.803
3 99808eee-9d74-4c9f-8256-17dff1fe44fe 518 131 2023-03-06 23:57:25 40554fc4-428a-4117-8820-b3f4f4927021 DataHubManifestLoaderAction True 1238 2023-03-06 23:57:33.196 2023-03-06 23:57:33.999 0.803
4 99808eee-9d74-4c9f-8256-17dff1fe44fe 518 131 2023-03-06 23:57:25 05082103-d42e-49ab-a66e-bd31b4c0e3df ExpressionEvaluator True 1238 2023-03-06 23:57:33.872 2023-03-06 23:57:33.999 0.127
5 99808eee-9d74-4c9f-8256-17dff1fe44fe 518 131 2023-03-06 23:57:25 fe77761b-eb11-4491-b19c-ebb56d009b76 PublishToVault True 1238 2023-03-06 23:57:33.873 2023-03-06 23:57:34.520 0.647
async def process_instance(auth):
    dataflow_ids = await get_dataflow_ids(auth)

    domo_dataflows = await get_dataflows(dataflow_ids)

    actions_df_ls = await ce.gather_with_concurrency(
        *[
            generate_version_action_pdf(domo_dataflow)
            for domo_dataflow in domo_dataflows
        ],
        n=20
    )

    stats_df_ls = await ce.gather_with_concurrency(
        *[generate_action_stats_df(domo_dataflow) for domo_dataflow in domo_dataflows],
        n=20
    )

    actions_df = pd.concat(actions_df_ls)

    stats_df = pd.concat(stats_df_ls)

    # import domojupyter as dj
    # dj.write_dataframe(actions_df, 'MONIT_Dataflow_Tiles')
    # dj.write_dataframe(stats_df, 'MONIT_Dataflow_Stats')

    return actions_df, stats_df


base_actions_df, base_stats_df = await process_instance(auth)
⚠️ dataflow 69 - Copy of gov_datasets_INT has never been executed
⚠️ dataflow 27 - JW_Simple Sample Set_Rolling Averages has never been executed
⚠️ dataflow 394 - ETL Forecasting has never been executed
⚠️ dataflow 85 - Working Hours Test has never been executed
⚠️ dataflow 322 - Forecast Sales - HV has never been executed
⚠️ dataflow 323 - New ETL Transform has never been executed
⚠️ dataflow 83 - Working Hours Example has never been executed
⚠️ dataflow 410 - Domain Visits - SEA has never been executed
def generate_datasets(actions_df, stats_df, dataflow_id=None, execution_id=None):

    ### filter and configure actions_df
    actions_df = actions_df.copy()
    if dataflow_id:
        actions_df = actions_df[actions_df["dataflow_id"] == dataflow_id]
    actions_df.rename(columns={"name": "tile_name"}, inplace=True)

    #### filter and configure stats_df
    stats_df = stats_df.copy()
    if execution_id:
        stats_df = stats_df[stats_df["dataflow_execution_id"] == execution_id]
    if dataflow_id:
        stats_df = stats_df[stats_df["dataflow_id"] == dataflow_id]

    stats_df = pd.merge(
        stats_df,
        actions_df[
            ["dataflow_id", "dataflow_name", "dataflow_version", "tile_id", "tile_name"]
        ],
        how="inner",
    )

    #### handle generate facts_df with one row per action and parent
    explode_df = actions_df[
        ["dataflow_id", "dataflow_version", "tile_id", "depends_on"]
    ].explode("depends_on")

    facts_df = stats_df[
        [
            "dataflow_name",
            "dataflow_execution_id",
            "dataflow_version",
            "dataflow_id",
            "tile_id",
            "tile_name",
            "tile_type",
            "rows_processed",
            "dataflow_begin_time",
            "begin_time",
            "end_time",
            "duration_in_sec",
        ]
    ]

    parent_df = facts_df[
        [
            "dataflow_execution_id",
            "dataflow_version",
            "dataflow_id",
            "tile_id",
            "tile_name",
            "tile_type",
            "rows_processed",
            "end_time",
        ]
    ].rename(
        columns={
            "tile_id": "depends_on",
            "tile_name": "parent_tile_name",
            "tile_type": "parent_tile_type",
            "rows_processed": "parent_rows_processed",
            "end_time": "parent_end_time",
        }
    )

    facts_by_parent_df = pd.merge(
        facts_df,
        explode_df,
        how="inner",
        on=["dataflow_id", "tile_id", "dataflow_version"],
    )
    facts_by_parent_df = pd.merge(
        facts_by_parent_df,
        parent_df,
        how="left",
        on=["dataflow_id", "dataflow_version", "dataflow_execution_id", "depends_on"],
    ).rename(columns={"depends_on": "parent_tile_id"})

    facts_by_parent_df["parent_end_time"] = facts_by_parent_df[
        "parent_end_time"
    ].fillna(facts_by_parent_df["begin_time"])

    facts_by_parent_df["actual_duration_in_sec"] = (
        facts_by_parent_df["end_time"] - facts_by_parent_df["parent_end_time"]
    ).dt.total_seconds()
    facts_by_parent_df["tile_delay_rank"] = facts_by_parent_df.groupby(
        ["dataflow_execution_id", "dataflow_version", "dataflow_id", "tile_id"]
    )["actual_duration_in_sec"].rank(ascending=False)

    return facts_by_parent_df.sort_values(
        by=["dataflow_id", "begin_time", "parent_tile_id"], ascending=True
    )


generate_datasets(
    base_actions_df,
    base_stats_df,
    TEST_DATAFLOW_ID,
    execution_id="1e331f2b-1db8-460d-9860-334aedc88e93",
)
dataflow_name dataflow_execution_id dataflow_version dataflow_id tile_id tile_name tile_type rows_processed dataflow_begin_time begin_time end_time duration_in_sec parent_tile_id parent_tile_name parent_tile_type parent_rows_processed parent_end_time actual_duration_in_sec tile_delay_rank
0 Datasets_lineage 1e331f2b-1db8-460d-9860-334aedc88e93 441 108 dce3487e-6a2d-49a2-afd4-1f7867cb3b95 Governance_datasets DataHubManifestLoaderAction 570.0 2022-11-03 09:33:00 2022-11-03 09:33:05.717 2022-11-03 09:33:06.614 0.897 None NaN NaN NaN 2022-11-03 09:33:05.717 0.897 1.0
1 Datasets_lineage 1e331f2b-1db8-460d-9860-334aedc88e93 441 108 df709f9b-0e6f-4f69-8b66-aa7411028db8 Governance_dataflow_details DataHubManifestLoaderAction 361.0 2022-11-03 09:33:00 2022-11-03 09:33:05.717 2022-11-03 09:33:06.656 0.939 None NaN NaN NaN 2022-11-03 09:33:05.717 0.939 1.0
2 Datasets_lineage 1e331f2b-1db8-460d-9860-334aedc88e93 441 108 3b3396dd-a673-4211-868f-abbbcc4ce0a3 Datasets SelectValues 570.0 2022-11-03 09:33:00 2022-11-03 09:33:06.609 2022-11-03 09:33:06.614 0.005 dce3487e-6a2d-49a2-afd4-1f7867cb3b95 Governance_datasets DataHubManifestLoaderAction 570.0 2022-11-03 09:33:06.614 0.000 1.0
4 Datasets_lineage 1e331f2b-1db8-460d-9860-334aedc88e93 441 108 33a5f316-cb1e-4c2c-bf10-911396a5ae97 Python Script PythonEngineAction 931.0 2022-11-03 09:33:00 2022-11-03 09:33:06.610 2022-11-03 09:33:15.408 8.798 3b3396dd-a673-4211-868f-abbbcc4ce0a3 Datasets SelectValues 570.0 2022-11-03 09:33:06.614 8.794 1.0
5 Datasets_lineage 1e331f2b-1db8-460d-9860-334aedc88e93 441 108 33a5f316-cb1e-4c2c-bf10-911396a5ae97 Python Script PythonEngineAction 931.0 2022-11-03 09:33:00 2022-11-03 09:33:06.610 2022-11-03 09:33:15.408 8.798 7e839c17-3e1a-4938-86d4-1b19d61c3662 Dataflow Details SelectValues 361.0 2022-11-03 09:33:06.656 8.752 2.0
3 Datasets_lineage 1e331f2b-1db8-460d-9860-334aedc88e93 441 108 7e839c17-3e1a-4938-86d4-1b19d61c3662 Dataflow Details SelectValues 361.0 2022-11-03 09:33:00 2022-11-03 09:33:06.656 2022-11-03 09:33:06.656 0.000 df709f9b-0e6f-4f69-8b66-aa7411028db8 Governance_dataflow_details DataHubManifestLoaderAction 361.0 2022-11-03 09:33:06.656 0.000 1.0
6 Datasets_lineage 1e331f2b-1db8-460d-9860-334aedc88e93 441 108 dffae0ba-4e21-457f-b190-9af841e5f771 Datasets_lineage PublishToVault 1391.0 2022-11-03 09:33:00 2022-11-03 09:33:15.348 2022-11-03 09:33:15.490 0.142 33a5f316-cb1e-4c2c-bf10-911396a5ae97 Python Script PythonEngineAction 931.0 2022-11-03 09:33:15.408 0.082 1.0
generate_datasets(base_actions_df, base_stats_df, None, None)

# dj.write_dataframe(exploded_df , 'MONIT_Dataflow_Tiles_Exploded')
dataflow_name dataflow_execution_id dataflow_version dataflow_id tile_id tile_name tile_type rows_processed dataflow_begin_time begin_time end_time duration_in_sec parent_tile_id parent_tile_name parent_tile_type parent_rows_processed parent_end_time actual_duration_in_sec tile_delay_rank
16884 MetaData_Stage 1 8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424 19 8 d312578c-e9cb-4e3a-9dc6-662c00c93ba0 Governance_Users and Groups DataHubManifestLoaderAction 35.0 2020-07-09 16:00:07 2020-07-09 16:00:17.000 2020-07-09 16:00:21.000 4.000 None NaN NaN NaN 2020-07-09 16:00:17.000 4.000 1.0
16885 MetaData_Stage 1 8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424 19 8 d316ddf3-9917-4542-bf68-e1cd8397d894 Governance_Groups DataHubManifestLoaderAction 3.0 2020-07-09 16:00:07 2020-07-09 16:00:17.000 2020-07-09 16:00:21.000 4.000 None NaN NaN NaN 2020-07-09 16:00:17.000 4.000 1.0
16887 MetaData_Stage 1 8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424 19 8 eebdf394-fc6d-40c0-895e-b325044db506 MetaData_Groups PublishToVault 3.0 2020-07-09 16:00:07 2020-07-09 16:00:21.000 2020-07-09 16:00:22.000 1.000 0fe45cc8-7c88-48cb-a437-4656954767a6 PDP? Constant 3.0 2020-07-09 16:00:21.000 1.000 1.0
16886 MetaData_Stage 1 8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424 19 8 e048f809-08ae-49c2-84be-5b7b76a99944 MetaData_Users and Groups PublishToVault 35.0 2020-07-09 16:00:07 2020-07-09 16:00:21.000 2020-07-09 16:00:22.000 1.000 8e52e4dc-fd3a-455b-9ec1-c2a404d342d0 PDP? 1 Constant 35.0 2020-07-09 16:00:21.000 1.000 1.0
16883 MetaData_Stage 1 8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424 19 8 8e52e4dc-fd3a-455b-9ec1-c2a404d342d0 PDP? 1 Constant 35.0 2020-07-09 16:00:07 2020-07-09 16:00:21.000 2020-07-09 16:00:21.000 0.000 d312578c-e9cb-4e3a-9dc6-662c00c93ba0 Governance_Users and Groups DataHubManifestLoaderAction 35.0 2020-07-09 16:00:21.000 0.000 1.0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
18775 Sample Store Sales Flow b95e7a38-cc77-49db-9c28-5923991fe556 1551 426 e895ae72-acbf-4e7c-8b9a-3341ba8e944f Select Columns SelectValues 19378.0 2025-01-24 16:37:32 2025-01-24 16:37:41.729 2025-01-24 16:37:42.627 0.898 fd921644-a6ce-45db-9482-576ed9638f55 Alter Columns Metadata 19378.0 2025-01-24 16:37:42.627 0.000 1.0
18776 Sample Store Sales Flow b95e7a38-cc77-49db-9c28-5923991fe556 1551 426 1d6404a2-7810-4595-961e-0e6c00eb545d Sample Store Sales Flow PublishToVault 19378.0 2025-01-24 16:37:32 2025-01-24 16:37:41.736 2025-01-24 16:37:42.836 1.100 e895ae72-acbf-4e7c-8b9a-3341ba8e944f Select Columns SelectValues 19378.0 2025-01-24 16:37:42.627 0.209 1.0
18865 HeartVoronoi e55abc80-eadf-4d79-a057-49a1593a8e51 1553 427 b16e8be9-62b5-4e40-8d03-ee405886e179 HeartVoron 1 DataHubManifestLoaderAction 9643.0 2025-01-31 16:06:23 2025-01-31 16:06:28.519 2025-01-31 16:06:28.849 0.330 None NaN NaN NaN 2025-01-31 16:06:28.519 0.330 1.0
18866 HeartVoronoi e55abc80-eadf-4d79-a057-49a1593a8e51 1553 427 1d555ba4-520e-4e75-9a41-a3a3e7f417ab Python Script PythonEngineAction 9643.0 2025-01-31 16:06:23 2025-01-31 16:06:28.793 2025-01-31 16:06:31.155 2.362 b16e8be9-62b5-4e40-8d03-ee405886e179 HeartVoron 1 DataHubManifestLoaderAction 9643.0 2025-01-31 16:06:28.849 2.306 1.0
18867 HeartVoronoi e55abc80-eadf-4d79-a057-49a1593a8e51 1553 427 d9c960ce-241b-4bf3-ba55-6e8537cfd4ca HeartVoron 1 PublishToVault 1.0 2025-01-31 16:06:23 2025-01-31 16:06:31.350 2025-01-31 16:06:31.536 0.186 1d555ba4-520e-4e75-9a41-a3a3e7f417ab Python Script PythonEngineAction 9643.0 2025-01-31 16:06:31.155 0.381 1.0

21648 rows × 19 columns